{
 "metadata": {
  "name": ""
 },
 "nbformat": 3,
 "nbformat_minor": 0,
 "worksheets": [
  {
   "cells": [
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "CS194-16: Introduction to Data Science\n",
      "\n",
      "__Name:__ *Please put your name*\n",
      "\n",
      "__Student ID:__ *Please put your student id*"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "# Homework 3\n",
      "\n",
      "# Part 2: Predicting Movie Ratings At Scale\n",
      "\n",
      "In the first part of this assignment, you explored how to recommend movies to a user using a small version of the movies and ratings datasets.  In this part of the assignment, you'll first explore how to tune the parallelism of an application running on a cluster.  Then, you'll use your findings to run your code from the first part on a cluster of machines and using a larger dataset.  You'll use the cluster and larger dataset to predict what movies to recommend to yourself.  For many parts of this assignment, you should be able to use the exact code that you wrote in part 1!\n",
      "\n",
      "<p style=\"background-color: #f2dede; padding: 10px 15px; color: #b94a48;\">__This version of the homework is for reference only!__  Each student has been emailed the URL and password for an iPython notebook server running on the cluster we've setup for you.  You should use the iPython notebook at that URL to complete your homework.  If you did not receive login information yet or if your login information doesn't work, please post a private note on Piazza ASAP so that we can resolve the issue. </p>\n",
      "\n",
      "As mentioned during the lab and in part 1, think carefully before calling `collect()` on any datasets.  When you're using a small, local dataset, calling `collect()` and then using Python to analyze the data locally will work fine, but this will not work when you're using a large dataset that doesn't fit in memory on one machine.  Solutions that call `collect()` and do local analysis that could have been done with Spark will not receive full credit.\n",
      "\n",
      "As in the first part, we have created a [FAQ](#FAQ) at the bottom of this page to help with common problems you run into.  If you run into a problem, please check the FAQ before posting on Piazza!"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "## Exercise 0: Setup\n",
      "\n",
      "As in the lab (and unlike part 1 of the homework), we've already created a `SparkContext` for you that's available with the `sc` variable name.  The code below prints out the URLs of the master UI and the UI for your application."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "# We've setup the notebook so that the hostname of the master is saved\n",
      "# as CLUSTER_URL.\n",
      "master_ui_address = \"\".join(CLUSTER_URL.split(\"//\")[1].split(\":\")[0])\n",
      "print \"Master UI located at http://%s:8080\" % master_ui_address\n",
      "\n",
      "application_ui_address = \"http://\" + sc.appName + \":4040\"\n",
      "print \"Application UI located at %s\" % application_ui_address"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "a) Use the `SparkContext` to create an RDD for each of the ratings and movies datasets, which are located at `/movielens/large/ratings.dat` and `/movielens/large/movies.dat`, respectively (as in the lab).  Recall that each entry in the ratings dataset is formatted as UserID::MovieID::Rating::Timestamp and each entry in the movies dataset is formatted as MovieID::Title::Genres.  Count the entries in each dataset to ensure you've correctly created the RDD; there should be 10000054 entries in the ratings dataset and 10681 entries in the movies dataset."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "ratings = # YOUR CODE HERE\n",
      "movies = # YOUR CODE HERE\n",
      "\n",
      "ratings_count = ratings.count()\n",
      "movies_count = movies.count()\n",
      "print \"%s ratings and %s movies in the datasets\" % (ratings_count, movies_count)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "b) As you did in part 1, use the least significant digit of the rating timestamp to separate 60% of the data into a training set, 20% into a validation set, and the remaining 20% into a test set."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "### YOUR CODE HERE\n",
      "training = # YOUR CODE HERE \n",
      "validation = # YOUR CODE HERE\n",
      "test = # YOUR CODE HERE\n",
      "\n",
      "print \"Training: %s, validation: %s, test: %s\" % (training.count(), validation.count(), test.count())"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Now the datasets are 10x bigger, so the training set should have about 6 million entries, the validation set should have about 2 million entries, and the test set should have about 2 million entries."
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "## Exercise 1: Understanding Parallelism\n",
      "\n",
      "Before analyzing the movies datasets at scale, let's learn a little about how to optimize the performance.  In the lab, we saw that the number of partitions that a dataset is broken into can have a big affect on the response time of jobs run on that dataset.  In this part of the assignment, you'll determine the optimal number of partitions to break datasets into.\n",
      "\n",
      "a) In the next part of this exercise, you'll break the ratings dataset into different numbers of partitions, and then call `count()`.  First, answer some questions to understand how this will work.\n",
      "\n",
      "> __How does the number of tasks relate to the number of partitions?__ *Your answer here*\n",
      "\n",
      "b)\n",
      "> __When you call `count()`, what does each task do?__ *Your answer here*\n",
      "\n",
      "c)\n",
      "> __How do you expect the time to count the entries in the `ratings` dataset to vary with the number of partitions?__ *Your answer here (the goal here is to make a meaningful hypothesis -- it doesn't necessarily need to be correct)*"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "d) Create an RDD for the `ratings` dataset with different numbers of partitions, and call `count()` on the resulting datasets.  Using Spark and [matplotlib](http://matplotlib.org/index.html), make a graph showing the response time of `count()` as a function of the number of partitions.  You're responsible for choosing the number of partitions to use to produce a meaningful graph of how `count()`'s runtime depends on the number of partitions. \n",
      "\n",
      "`sc.textFile` will never allow you to use fewer than 2 partitions for the `ratings` dataset, even if you pass 1 as the number of partitions.  This is because the underlying dataset is stored in HDFS (Hadoop Distributed Filesystem) as two files.  As a result, you don't need to measure the time for `count()` using just 1 partition."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "import matplotlib.pyplot as plot\n",
      "# Magic command to make matplotlib and ipython play nicely together.\n",
      "%matplotlib inline\n",
      "\n",
      "### YOUR CODE HERE"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "e)\n",
      ">__Explain the shape of the plot.  How does this differ from the hypothesis you provided in part (a)?  Can you explain the difference?__ *Your answer here*\n",
      "\n",
      "f)\n",
      ">__What is the optimal number of partitions for this dataset and why?__ *Your answer here*"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "## Exercise 2: Adding Your Rankings\n",
      "\n",
      "The ultimate goal of this assignment is to predict what movies to recommend to yourself.  In order to do that, you'll first need to add ratings for yourself to the `ratings` dataset.\n",
      "\n",
      "a) First, generate a unique user ID for yourself.  Find the highest user ID in the `ratings` dataset and use 1 + this ID as your user ID. "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "highest_existing_user_id = # YOUR CODE HERE\n",
      "\n",
      "my_user_id = highest_existing_user_id + 1\n",
      "print \"My user ID is %s\" % my_user_id"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "b) Next, you need to create a RDD with some ratings for yourself.  Create a new RDD `my_ratings_RDD` with at least 10 movie ratings.  Each entry should be formatted as `(my_user_id, movieID, rating)` (i.e., each entry should be formatted in the same way as the `training` RDD).  As in the original dataset, ratings should be between 1 and 5 (inclusive).\n",
      "\n",
      "To help you provide ratings for yourself, we've included some code below to list the names and movie IDs of the 50 most-rated movies (we did this in the lab; the code below is replicated from the lab).  Select 10 movies that you've seen from this list and use those (along with your own ratings for those movies) to produce the `my_ratings_RDD`.  If you haven't seen at least 10 of these movies, you can increase the parameter passed to `take()` until there are 10 movies that you have seen (or you can also guess what your rating would be for movies you haven't seen)."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "# Create a dataset of (movieID, number of ratings) pairs.\n",
      "ratings_per_movie = ratings.map(lambda x: (x[1], 1)).reduceByKey(lambda x,y: x+y)\n",
      "\n",
      "# Join average_ratings with movies to get a dataset with movie names and average ratings.\n",
      "ratings_with_names = movies.map(lambda x: (x[0], x[1])).join(ratings_per_movie)\n",
      "\n",
      "# map transforms ratings_with_names into an RDD where the key is the number of ratings\n",
      "# and the value is a 2-item tuple with (movie name, number of ratings). We reformat the\n",
      "# dataset in this way so that we can use sortByKey to get the most-rated movies.\n",
      "sorted_ratings = ratings_with_names.map(lambda x: (x[1][1], (x[1][0], x[0]))).sortByKey(False)\n",
      "print \"Most rated movies:\"\n",
      "for ratings_tuple in sorted_ratings.take(50):\n",
      "    print ratings_tuple"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "my_ratings_RDD = # YOUR CODE HERE\n",
      "\n",
      "# Remember that in general, you shouldn't use collect()!\n",
      "# We use collect here because we know that the RDD with your\n",
      "# ratings is small.\n",
      "print \"My movie ratings: \", my_ratings_RDD.collect()"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "c) Now that you have ratings for yourself, you need to add your ratings to the `training` dataset so that the model you train will incorporate your preferences.  Spark's `union` method combines two RDDs (see documentation [here](http://spark.apache.org/docs/0.9.0/api/pyspark/pyspark.rdd.RDD-class.html#union)); use `union` to create a new training dataset that includes your ratings and the data in the original training dataset."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "training_with_my_ratings = # YOUR CODE HERE\n",
      "\n",
      "print (\"The training dataset now has %s more entries than the original training dataset\" % \n",
      "       (training_with_my_ratings.count() - training.count()))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "The new training dataset should have at least 10 more entries than it had originally."
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "## Exercise 3: Building a Model\n",
      "\n",
      "a) In this exercise, you'll run your code from part 1 on the larger dataset.  First, we need to think a bit more carefully about how we do things to ensure using the larger dataset doesn't take too long on the cluster.  Use Spark's `repartition(numPartitions)` function (which re-partitions the dataset into the specified number of partitions) to break the `test`, `training_with_my_ratings`, and `validation` datasets into the optimal number of partitions that you found in part 1."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "### YOUR CODE HERE"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "b) As you did in part 1, train models with ranks of 4, 8, 12, and 16 using the `training_with_my_ratings` dataset (remember that you should use this dataset, which includes your `ratings`, instead of the original `training` dataset). Predict the ratings for the `validation` dataset, and use the `compute_error` function you wrote in part 1 to compute the error.  Which rank produces the best model, based on the error on the `validation` dataset?\n",
      "\n",
      "Now that you're running on a cluster, you'll want to think carefully about what datasets you cache in memory.  Remember that Spark only saves datasets in memory if you explicitly call `cache()` on the dataset, like we did in the lab.  You can use Spark's UI (located at port 4040 on the machine where this notebook is running) to see what datasets are cached in memory (you may want to use the `setName()` function to set a name for RDDs so they can easily be identified in the UI).  If you accidentally cache a dataset that you don't need, you can call `unpersist()` on the RDD to drop it from the cache."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "from pyspark.mllib.recommendation import ALS\n",
      "\n",
      "best_rank = # YOUR CODE HERE\n",
      "\n",
      "print \"The best model was trained with rank %s\" % best_rank"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "c) \n",
      "> __Which datasets did you decide to cache in memory and why?__ Your answer here"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "d) Like you did in part 1, use the best model to predict the ratings on the `test` dataset, and compute the RMSE."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "test_rmse = # YOUR CODE HERE\n",
      "print \"The model had a RMSE on the test set of %s\" % test_rmse"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "e) Our solution code for part 1 got a RMSE of about 1.1 (subject to a small amount of variance, due to randomness in the ALS implementation) on the test set (you may want to use this to verify that your part 1 code was correct!).  How does this compare to the RMSE you got here?\n",
      "\n",
      ">__Is the test RMSE larger, smaller, or about the same as the RMSE (about 1.1) from part 1?  Why?__ Your answer here"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "## Exercise 4: Recommending Movies for Yourself\n",
      "\n",
      "a) So far, we've only used the `predictAll` method to compute the error of the model.  Here, use the `predictAll` to predict what ratings you'd give to the movies that you didn't already provide ratings for. Print out the 10 movies with the highest predicted ratings."
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "predicted_10_highest_rated_movies = # YOUR CODE HERE\n",
      "\n",
      "# This should print a list of 10 movie names and the associated\n",
      "# predicted ratings.\n",
      "print \"My highest rated movies: \", predicted_10_highest_rated_movies"
     ],
     "language": "python",
     "metadata": {},
     "outputs": []
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "b)\n",
      "> __Are the predictions any good?  Do you think (or know!) you'd like these movies?__ Your answer here"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "## Wrapping Up\n",
      "\n",
      "Please answer the questions in [this form](https://docs.google.com/a/berkeley.edu/forms/d/15i39zp0awFpNE36ljh58mctJ-PhDir0roS4VfPJHPvw/viewform) to help us improve the assignments for this class in the future.\n",
      "\n",
      "<p style=\"background-color: #f2dede; padding: 10px 15px; color: #b94a48;\">__Don't forget to submit your assignment using glookup!__ We will not automatically collect the notebooks on the cluster; you need to copy the notebook back to your local machine and submit it yourself when you're finished with the homework.  To save your iPython notebook to your machine from the cluster, go to \"File\" > \"Download as\" > \"iPython\".</p>"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "<a name=\"FAQ\"></a>\n",
      "## Frequently Asked Questions\n",
      "\n",
      "### How do I save my iPython notebook that's hosted in the cluster?\n",
      "\n",
      "Go to \"File\" > \"Download as\" > \"iPython\".\n",
      "\n",
      "### How do I get to the UI?\n",
      "\n",
      "The UI for the Spark master is on port 8080, and the hostname is stored as part of the `CLUSTER_URL` variable pre-loaded into this notebook.  From the master UI, you can find the link to your application's UI.\n",
      "\n",
      "### What are all of the operations I can do on a Spark dataset (RDD)?\n",
      "\n",
      "[This Page](http://spark.apache.org/docs/0.9.0/api/pyspark/pyspark.rdd.RDD-class.html) lists all of the operations you can do on a Spark RDD.  Spark also has a Scala API (Scala is a programming language similar to Java); the [documentation for the Scala functions](http://spark.apache.org/docs/0.9.0/scala-programming-guide.html) is sometimes more helpful, and the Python functions work in the same way.\n",
      "\n",
      "### How do I use matplotlib?\n",
      "\n",
      "There are lots of good examples on the [matplotlib website](http://matplotlib.org/index.html).  For example, [this page](http://matplotlib.org/examples/pylab_examples/simple_plot.html) shows how to plot a single line.\n",
      "\n",
      "### Why am I getting an OutOfMemoryError?\n",
      "\n",
      "If you get an error that looks like: `org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: java.lang.OutOfMemoryError: Java heap space`, it probably means that you've tried to collect too much data on the machine where Python is running.  This is likely to happen if you do `collect()` on a large dataset.  To remedy this problem, you'll need to restart your iPython notebook (go to the main server, at port 8888 of the machine you were assigned, click \"Shutdown\" on your notebook, and then open it again) and don't do `collect()` on a large dataset.\n",
      "\n",
      "Curious why you're getting a Java error when your program is written in Python?  Spark is mostly written in Java (and Scala, a language built on top of Java).  We're using `pyspark` here, which uses a translation layer to translate between Python and Java.  Your Python `SparkContext` object is backed by a Java `SparkContext` object; all operations you run on Spark datasets are passsed through this Java object.  So, if you try to collect a result that's too large, the Java Virtual Machine that's running the Java `SparkContext` runs out of memory.\n",
      "\n",
      "### Python / Spark is giving me a crazy weird error!\n",
      "\n",
      "Spark is mostly written in Scala and Java, and the Python version of the code (\"pyspark\") hooks into the Java implementation in a way that can make error messages very difficult to understand.  If you get a hard-to-understand error when you run a Spark operation, we recommend first narrowing down the error so that you know exactly which operation caused the error.  For example, if `rdd.groupByKey().map(lambda x: x[1])` fails with an error, separate the `groupByKey()` and `map()` calls onto separate lines so you know which one is causing the error.  Next, double check the function signature to make sure you're passing the right arguments.  Pyspark can fail with a weird error if a RDD operation is given the wrong number or type of arguments.  If you're still stumped, try using `take(10)` to print out the first 10 entries in the dataset you're calling the RDD operation on.  Make sure the function you're calling and the arguments you're passing in make sense given the format of the input dataset.\n",
      "\n",
      "### I ran some code and nothing happened!\n",
      "\n",
      "Are you sure?  Some of the Spark operations will take a minute or so to run; look at the top of the iPython notebook to see if it says \"Kernel busy\".  If so, it's busy running your code.  Go checkout the Spark UI to see more about what's going on.\n",
      "\n",
      "<a name=\"faq_taking_forever\"></a>\n",
      "### My code is taking forever to run.  Did I do something wrong?\n",
      "\n",
      "Yes.  In our solution code, none of the Spark stages take more than about a minute to run (remember that you can see a job's stages by looking at the Spark UI).  If you ran something and it's taking forever, double check that you passed in the datasets you inteded to.  One common problem from part 1 was that people passed RDDs in the wrong format (e.g., with 3 items instead of just a key and a value) to `join`.  Use `take()` to look at the datasets passed into the offending function to make sure they're in the right format.\n",
      "\n",
      "If you end up with a job that's taking forever, you'll need to kill the job; otherwise it will hog all of your cluster resources such that you won't be able to run anything else.  Kill the job by shutting down this notebook (which will destroy your `SparkContext` and the associated worker process that's doing work) and then re-launching it.  You can shutdown your notebook by going to \"File\" > \"Close and Halt\" (but be sure to hit save first!).\n",
      "\n",
      "<a name=\"faq_no_predictions\"></a>\n",
      "### I'm not getting any predictions when I try to predict the ratings for myself!\n",
      "\n",
      "The model will only provide predictions for users who were included in the training set.  Make sure you included your own data when you trained the model.\n",
      "\n",
      "<a name=\"faq_high_ratings\"></a>\n",
      "### Some of the predicted ratings are bigger than 5!  Did I do something wrong?\n",
      "The ALS algorithm has no notion of bounds on expected ratings, so you may get predicted ratings that are larger than 5.  This does not necessarily signify a bug in your code."
     ]
    }
   ],
   "metadata": {}
  }
 ]
}